Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http_client: Add Ability to Process Http Chunked Stream #8316

Merged
merged 2 commits into from
Mar 15, 2024

Conversation

ryanohnemus
Copy link
Contributor

@ryanohnemus ryanohnemus commented Dec 21, 2023

Currently flb_http_do processes chunked encoded responses, but requires that all chunks are received before allowing interaction with the response payload.

This change allows a user to only initiate the http request with flb_http_do_request and then process the stream of data with calls to flb_http_get_response_data.

This is needed to be able to process HTTP/1.1 chunk-encoded streams that do not normally end (or are very long lived). Ex: Kubernetes watches use http chunk-encoded streams that continuously send updates as chunks until the session expires.

A new flb_http internal code FLB_HTTP_CHUNK_AVAILABLE was added to signify that there are received chunks but there is more data to be received from upstream. The current data received is in the client.resp.payload but it is not guaranteed to be a full application message, so users implementing streamed chunk processing will need to account for it by checking their own end of message semantics and accounting for it with the bytes_consumed logic highlighted in the usage section below.

The end of stream's "end chunk" will continue to return FLB_HTTP_OK which signifies the end of the chunked transfer.

Details on usage:

flb_http_do_request - new method added with it's only purpose as sending the http request, does not process any response data

flb_http_get_response_data - this will handle the data recv & chunk processing that was originally a part of flb_http_do, the difference is it allows you to pass in a bytes_consumed param. The idea behind this is that any bytes_consumed will be removed from the current http payload portion. This is extremely useful as it allows you to process chunked streams using the internal http buffer without having to copy chunks outside of the http connection. It also allows you to leave max buffer size logic within the http client code, and have your connection error out if you've let the buffer grow too far. Standard flb_http_do will loop through flb_http_get_response_data always passing bytes_consumed=0 as it does not consume any of the chunked data until the end chunk(end of stream) when the entire response is available within the client.resp.payload. Future users (see mocked usage example below), will be able to process parts of the payload as chunks become available and inform the http_client to clean up those bytes with bytes_consumed before it attempts to read more chunks from upstream.
I included documentation in the code and found this to be a fairly clear way to understand how to use this method:

    /* returns 
     *  FLB_HTTP_MORE - if we are waiting for more data to be received
     *  FLB_HTTP_CHUNK_AVAILABLE - if this is a chunked transfer and one or more chunks
     *                 have been received and it is not the end of the stream
     *  FLB_HTTP_OK - if we have collected all response data and no errors were thrown 
     *                (in chunked transfers this means we've received the end chunk 
     *                and any remaining data to process from the end of stream, will be
     *                contained in the response payload)
     *  FLB_HTTP_ERROR - for any error
     */

flb_http_do - will work the same as today, the internals of the call were updated to use flb_http_do_request with flb_http_get_response_data until the end chunk is received. I did however change when an http_client can not increase buffer because it would go past max size, now returns a FLB_HTTP_ERROR instead of 0.

Code Example:
This is pseudocode-ish but working with my minikube k8s config included later.

#define JSON_ARRAY_DELIM "\r\n"

static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, 
                size_t *bytes_consumed)
{
    int ret = 0;
    char *token = NULL;
    int root_type;
    size_t consumed = 0;
    char *buf_data = NULL;
    size_t buf_size;
    flb_sds_t payload;
    
    //we copy payload because tokenizer will overwrite when it finds end of string
    payload = flb_sds_create_len(c->resp.payload, c->resp.payload_size);

    token = strtok(payload, JSON_ARRAY_DELIM);
    while ( token != NULL && ret == 0 ) {
        ret = flb_pack_json(token, strlen(token), &buf_data, &buf_size, &root_type, &consumed);
        if (ret == -1) {
            //This is not actually an error as we may be in the middle of a received chunk
            flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s", token);
        } else {
            *bytes_consumed += strlen(token) + 1;
            //ret = process_watched_event(ctx, buf_data, buf_size);
        }
    
        if (buf_data) {
            flb_free(buf_data);
        }
        buf_data = NULL;
        token = strtok(NULL, JSON_ARRAY_DELIM);
    }
    
    if (buf_data) {
        flb_free(buf_data);
    }
    flb_sds_destroy(payload);
    return ret;
}

static int k8s_events_collect(struct flb_input_instance *ins,
                              struct flb_config *config, void *in_context)
{
//... 
    //setup http client
    u_conn = flb_upstream_conn_get(ctx->upstream)
    c = flb_http_client(u_conn, FLB_HTTP_GET, "api/v1/events?watch=1",
                            NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
    flb_http_buffer_size(c, 0);
    flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
    if (ctx->auth_len > 0) {
        flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len);
    }

    // Watch will stream chunked json data, so we only send
    // the http request, then use flb_http_get_response_data
    // to attempt processing on available streamed data
    b_sent = 0;
    ret = flb_http_do_request(c, &b_sent);
    if (ret != 0) {
        flb_plg_error(ins, "http do error");
        goto exit;
    }

    ret = FLB_HTTP_MORE;
    size_t bytes_consumed = 0;
    while( ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE ) {        
        ret = flb_http_get_response_data(c, bytes_consumed);
        bytes_consumed = 0;
        if( ret == FLB_HTTP_CHUNK_AVAILABLE ) {
            process_http_chunk(ctx, c, &bytes_consumed);
            //todo: process return of process_http_chunk
        }
    }
    // TODO: process any remaining payload data on socket close (FLB_HTTP_OK) or process error (FLB_HTTP_ERROR)

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[SERVICE]
    flush        1
    daemon       Off
    log_level    info
    http_server  Off
    http_listen  0.0.0.0
    http_port    2020

[INPUT]
    name          kubernetes_events
    tag           k8s_events
    kube_url      https://192.168.71.3:8443
    kube_ca_file  /Users/ryan/.minikube/ca.crt
    kube_token_file ./token
  • Debug log output from testing the change

  • Attached Valgrind output that shows no leaks or memory corruption was found

I ran run_code_analysis.sh with:

100% tests passed, 0 tests failed out of 133

Label Time Summary:
internal    =  98.57 sec*proc (57 tests)
runtime     = 1607.98 sec*proc (68 tests)

Total Test time (real) = 1743.18 sec

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@ryanohnemus
Copy link
Contributor Author

ryanohnemus commented Dec 21, 2023

This is part 1 of addressing: #8315, which needs the ability to watch for k8s events from a long-lived chunked stream.

The part 2 for the in_kubernetes_events plugin is in draft as it is dependent on this PR, but can be reviewed here: #8351

@@ -66,8 +67,6 @@ struct flb_http_response {
int content_length; /* Content length set by headers */
int chunked_encoding; /* Chunked transfer encoding ? */
int connection_close; /* connection: close ? */
long chunked_cur_size;
long chunked_exp_size; /* expected chunked size */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these were completely unused excluding in a mocked aws test, so I cleaned this up.

Current flb_http_do processes chunked streams, but
requires that all chunks are received before allowing
interaction with the response payload.

This change allows a user to only initiate the http
request with flb_http_do_request and then process
the live stream of data by fetching available chunks
with flb_http_get_available_chunks.

Signed-off-by: ryanohnemus <[email protected]>
@edsiper
Copy link
Member

edsiper commented Jan 10, 2024

thanks for this.

I suggest merging this for 3.0 release (March) due to API changes.

@edsiper edsiper added this to the Fluent Bit v3.0.0 milestone Jan 10, 2024
@edsiper
Copy link
Member

edsiper commented Mar 15, 2024

fixed minor conflict, waiting for CI

@edsiper
Copy link
Member

edsiper commented Mar 15, 2024

note CI issue is a timing thing in macos, not real issue

@edsiper edsiper merged commit dab232d into fluent:master Mar 15, 2024
41 of 42 checks passed
@ryanohnemus ryanohnemus deleted the feature/http_chunked_stream branch March 16, 2024 12:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants